{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import pandas as pd\n",
    "import multiprocessing as mp\n",
    "\n",
    "\n",
    "def lin_parts(num_atoms, num_threads):\n",
    "    parts = np.linspace(0, num_atoms, min(num_threads, num_atoms) + 1)\n",
    "    parts = np.ceil(parts).astype(int)\n",
    "    return parts\n",
    "\n",
    "\n",
    "def nested_parts(num_atoms, num_threads, descend=False):\n",
    "    parts = [0]\n",
    "    num_threads = min(num_threads, num_atoms)\n",
    "    for num in range(num_threads):\n",
    "        part = 1 + 4 * (parts[-1] ** 2 + parts[-1] + num_atoms * (num_atoms + 1.) / num_threads)\n",
    "        part = 0.5 * (-1 + np.sqrt(part))\n",
    "        parts.append(part)\n",
    "    if descend:\n",
    "        # Computational decreases as index increases\n",
    "        parts = np.cumsum(np.diff(parts)[::-1])\n",
    "        parts = np.append(np.array([0]), parts)\n",
    "    parts = np.round(parts).astype(int)\n",
    "    return parts "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array([ 0, 10, 14, 17, 20])"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "nested_parts(20, 4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array([ 0,  3,  6, 10, 20])"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "nested_parts(20, 4, True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array([ 0,  5, 10, 15, 20])"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "lin_parts(20, 4)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 20.3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def barrier_tourch(r, width=.5):\n",
    "    t = dict()\n",
    "    p = np.log((1 + r).cumprod(axis=0))\n",
    "    for j in range(r.shape[1]):\n",
    "        for i in range(r.shape[0]):\n",
    "            if np.abs(p[i][j]) >= width:\n",
    "                t[j] = i\n",
    "                continue\n",
    "    return t"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[   0   42   84  125  167  209  250  292  334  375  417  459  500  542\n",
      "  584  625  667  709  750  792  834  875  917  959 1000]\n",
      "CPU times: user 349 ms, sys: 69.2 ms, total: 418 ms\n",
      "Wall time: 880 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "\n",
    "r = np.random.normal(0, 0.01, size=(1000, 10000))\n",
    "num_threads = 24\n",
    "parts = lin_parts(len(r), num_threads)\n",
    "print(parts)\n",
    "jobs = []\n",
    "for i in range(1, len(parts)):\n",
    "    jobs.append(r[:, parts[i-1]:parts[i]])\n",
    "pool = mp.Pool(processes=num_threads)\n",
    "outputs = pool.imap_unordered(barrier_tourch, jobs)\n",
    "out = []\n",
    "for out_ in outputs:\n",
    "    out.append(out_)\n",
    "pool.close()\n",
    "pool.join()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[   0  204  288  353  408  456  500  540  577  612  645  677  707  736\n",
      "  764  790  816  842  866  890  913  935  957  979 1000]\n",
      "CPU times: user 413 ms, sys: 62.2 ms, total: 476 ms\n",
      "Wall time: 870 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "\n",
    "r = np.random.normal(0, 0.01, size=(1000, 10000))\n",
    "num_threads = 24\n",
    "parts = lin_parts(len(r), num_threads)\n",
    "parts = nested_parts(len(r), num_threads)\n",
    "print(parts)\n",
    "jobs = []\n",
    "for i in range(1, len(parts)):\n",
    "    jobs.append(r[:, parts[i-1]:parts[i]])\n",
    "pool = mp.Pool(processes=num_threads)\n",
    "outputs = pool.imap_unordered(barrier_tourch, jobs)\n",
    "out = []\n",
    "for out_ in outputs:\n",
    "    out.append(out_)\n",
    "pool.close()\n",
    "pool.join()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 20.5"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import time\n",
    "from datetime import datetime\n",
    "import sys\n",
    "\n",
    "def expand_call(kwargs):\n",
    "    func = kwargs['func']\n",
    "    del kwargs['func']\n",
    "    out = func(**kwargs)\n",
    "    return out\n",
    "\n",
    "def report_progress(job_idx, num_jobs, time0, task):\n",
    "    msg = [float(job_idx) / num_jobs, (time.time() - time0) / 60.]\n",
    "    msg.append(msg[1] * (1/msg[0] - 1))\n",
    "    time_stamp = str(datetime.fromtimestamp(time.time()))\n",
    "    msg_ = time_stamp + ' ' +str(round(msg[0]*100, 2))+ '% ' + task + ' done after ' + \\\n",
    "        str(round(msg[1], 2)) + ' minutes. Remaining ' + str(round(msg[2],2)) + ' minutes.'\n",
    "    if job_idx < num_jobs:\n",
    "        sys.stderr.write(msg_ + '\\r')\n",
    "    else:\n",
    "        sys.stderr.write(msg_ + '\\n')\n",
    "\n",
    "def process_jobs(jobs, task=None, num_threads=24):\n",
    "    if task is None:\n",
    "        task = jobs[0]['func'].__name__\n",
    "    pool = mp.Pool(processes=num_threads)\n",
    "    outputs = pool.imap_unordered(expand_call, jobs)\n",
    "    out = []\n",
    "    time0 = time.time()\n",
    "    # Execute programs here\n",
    "    for i, out_ in enumerate(outputs, 1):\n",
    "        out.append(out_)\n",
    "        report_progress(i, len(jobs), time0, task)\n",
    "    pool.close()\n",
    "    pool.join()\n",
    "    return out\n",
    "\n",
    "def mp_pandas_obj(func, pd_obj, num_threads=24, mp_batches=1, lin_mols=True, descend=False, **kwargs):\n",
    "    if lin_mols:\n",
    "        parts = lin_parts(len(pd_obj[1]), num_threads * mp_batches)\n",
    "    else:\n",
    "        parts = nested_parts(len(pd_obj[1]), num_threads * mp_batches, descend)\n",
    "    jobs = []\n",
    "    for i in range(1, len(parts)):\n",
    "        job = {pd_obj[0]: pd_obj[1][parts[i-1]: parts[i]], 'func': func}\n",
    "        job.update(kwargs)\n",
    "        jobs.append(job)\n",
    "    if num_threads == 1:\n",
    "        out = process_jobs(jobs)\n",
    "    else:\n",
    "        out = process_jobs(jobs, num_threads=num_threads)\n",
    "        \n",
    "    if isinstance(out[0], pd.Series):\n",
    "        df0 = pd.Series()\n",
    "    elif isinstance(out[0], pd.DataFrame):\n",
    "        df0 = pd.DataFrame()\n",
    "    else:\n",
    "        return out\n",
    "    \n",
    "    for i in out:\n",
    "        df0 = df0.append(i)\n",
    "    df0 = df0.sort_index()\n",
    "    return df0"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2018-06-30 10:08:03.486936 100.0% barrier_tourch done after 0.05 minutes. Remaining 0.0 minutes..\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 376 ms, sys: 132 ms, total: 508 ms\n",
      "Wall time: 3.44 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "\n",
    "r = np.random.normal(0, 0.01, size=(1000, 10000))\n",
    "num_threads = 24\n",
    "pd_obj =  [\"r\", r]\n",
    "out = mp_pandas_obj(barrier_tourch, pd_obj, num_threads)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 344 ms, sys: 147 ms, total: 491 ms\n",
      "Wall time: 12.6 s\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2018-06-30 10:08:23.220720 100.0% barrier_tourch done after 0.2 minutes. Remaining 0.0 minutes.\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "\n",
    "r = np.random.normal(0, 0.01, size=(1000, 10000))\n",
    "num_threads = 24\n",
    "pd_obj =  [\"r\", r]\n",
    "out = mp_pandas_obj(barrier_tourch, pd_obj, num_threads=1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
